跳到主要内容

Java 集合类学习 ConcurrentHashMap

ConcurrentHashMap 常见问题

是怎么保证线程安全的?

jdk1.7:Segment+HashEntry来进行实现的;

jdk1.8:放弃了Segment臃肿的设计,采用Node+CAS+Synchronized来保证线程安全;

1.7和1.8的区别

jdk1.8的实现降低锁的粒度,jdk1.7锁的粒度是基于Segment的,包含多个HashEntry,而jdk1.8锁的粒度就是Node

数据结构:jdk1.7 Segment+HashEntry;jdk1.8 数组+链表+红黑树+CAS+synchronized

默认初始容量是多少?

这个和 HashMap 一样都是 16

key,value是否可以为null?

不行 如果key或者value为null会抛出空指针异常

每次扩容是原来容量的几倍

2倍 在 transfer 方法里面会创建一个原数组的俩倍的 node 数组来存放原数据。这个和 HashMap 是一样的

每个节点是什么样的,有哪些变量

它实现 Map.Entry<K,V> 接口。里面存放了 hash,key,value,以及next节点。与 HashMap 不同之处在于它的 value 和 next 节点是用 volatile 进行修饰,可以保证多线程之间的可见性。

get方法是否要加锁,为什么?

由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。所以无需加锁

put过程是怎样的?

整体流程跟 HashMap 比较类似,大致是以下几步:

1.7 的插入

  1. 先使用自旋锁来尝试获取锁,如果重试的次数达到了最大重试次数则改为阻塞锁获取。
  2. 如果桶数组未初始化,则初始化;
  3. 如果待插入的元素所在的桶为空,则尝试把此元素直接插入到桶的第一个位置;
  4. 如果正在扩容,则当前线程一起加入到扩容的过程中;
  5. 如果待插入的元素所在的桶不为空且不在迁移元素,则锁住这个桶(分段锁);
  6. 如果当前桶中元素以链表方式存储,则在链表中寻找该元素或者插入元素;
  7. 如果当前桶中元素以红黑树方式存储,则在红黑树中寻找该元素或者插入元素;
  8. 如果元素存在,则返回旧值;
  9. 如果元素不存在,整个Map的元素个数加1,并检查是否需要扩容;

1.8 的插入

  1. 如果桶数组未初始化,则初始化;
  2. 如果待插入的元素所在的数组位置为空,则尝试把此元素直接插入到该位置上(CAS,如果 CAS 失败会到第 4 步);
  3. 如果正在扩容,则当前线程一起加入到扩容的过程中;
  4. 如果待插入的元素所在的数组位置不为空且不在迁移元素,则锁住这个节点(synchronized);
  5. 如果当前桶中元素以链表方式存储,则在链表中寻找该元素或者插入元素;
  6. 如果当前桶中元素以红黑树方式存储,则在红黑树中寻找该元素或者插入元素;
  7. 如果元素存在,则返回旧值;
  8. 如果元素不存在,整个Map的元素个数加1,并检查是否需要扩容;

添加元素操作中使用的锁主要有(自旋锁 + CAS + synchronized + 分段锁)。

get过程是怎样的?

1、计算 hash 值 2、根据 hash 值找到数组对应位置: (n - 1) & h 3、根据该位置处结点性质进行相应查找

如果该位置为 null,那么直接返回 null 就可以了 如果该位置处的节点刚好就是我们需要的,返回该节点的值即可

如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树,后面我们再介绍 find 方法如果以上 3 条都不满足,那就是链表,进行遍历比对即可

ConcurrentHashMap 是什么?

ConcurrentHashMap 是并发效率更高的 Map,用来替换其他线程安全的 Map 容器,比如 Hashtable 和 Collections.synchronizedMap

ConcurrentHashMap 同样也分为 1.7 、1.8 版,两者在实现上略有不同。

ConcurrentHashMap 使用示例

public class ConcurrentHashMapDemo {

private final ConcurrentHashMap<Integer, String> conHashMap = new ConcurrentHashMap<>();

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(3);
ConcurrentHashMapDemo ob = new ConcurrentHashMapDemo();
// 这个 ob.new WriteThreadOne() 是创建内部子类(这里无法使用静态类,因为要调用父类的 conHashMap 方法)
service.execute(ob.new WriteThreadOne());
service.execute(ob.new WriteThreadTwo());
service.execute(ob.new ReadThread());
service.shutdownNow();
}

class WriteThreadOne implements Runnable {
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
conHashMap.putIfAbsent(i, "A" + i);
}
}
}

class WriteThreadTwo implements Runnable {
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
conHashMap.put(i, "B" + i);
}
}
}

class ReadThread implements Runnable {
@Override
public void run() {
for (Integer key : conHashMap.keySet()) {
System.out.println(key + " : " + conHashMap.get(key));
}
}
}
}

检查打印结果:

1 : B1
2 : B2
3 : B3
4 : B4
5 : B5
6 : A6
7 : A7
8 : A8
9 : A9
10 : A10

可以发现不管打印多少次,前 5 个都是 B

JDK1.7 中的结构

先来看看 1.7 的实现,下面是他的结构图:

细节:

如图所示,是由 Segment 数组、HashEntry 组成,和 HashMap 一样,仍然是数组加链表。

ConcurrentHashMap 实现线程安全的主要手段,具体一点就是 Segment + HashEntry + ReentrantLock。简单来说,ConcurrentHashMap 是一个 Segment 数组(默认长度为16),每个 Segment 又包含了一个 HashEntry 数组,所以可以看做一个 HashMap, Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 Segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。

它的核心成员变量:

/**
* Segment 数组,存放数据时首先需要定位到具体的 Segment 中。
*/
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;

Segment 是 ConcurrentHashMap 的一个内部类,主要的组成如下:

static final class Segment<K, V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;

// 和 HashMap 中的 HashEntry 作用一样,真正存放数据的桶
transient volatile HashEntry<K, V>[] table;
transient int count;
transient int modCount;
transient int threshold;
final float loadFactor;
}

可以看到这个 Segment 如其名那样存储一段 HashEntry 数组,再来看看其中 HashEntry 的组成:

static class HashEntry<K,V> {
final int hash;
final K key;
volatile V val;
volatile HashEntry<K,V> next;

HashEntry(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
//...

和 HashMap 非常类似,唯一的区别就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。

原理上来说:ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel(Segment 数组数量)的线程并发。每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。

下面也来看看核心的 put get 方法。

put 方法

public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();

int hash = hash(key);

int j = (hash >>> segmentShift) & segmentMask;

// 根据上面的 hash 取得 Segment
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j); // in ensureSegment

// 把数据 put 到这个目标 Segment 里面
return s.put(key, hash, value, false);
}

首先是通过 key 定位到 Segment,之后在对应的 Segment 中进行具体的 put。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);

V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

虽然 HashEntry 中的 value 是用 volatile 关键词修饰的,但是并不能保证并发的原子性,所以 put 操作时仍然需要加锁处理。

首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁。

这个 scanAndLockForPut() 方法

  1. 尝试自旋获取锁。
  2. 如果重试的次数达到了 MAX_SCAN_RETRIES 则改为阻塞锁获取,保证能获取成功。

再结合图看看 put 的流程。

  1. 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry。
  2. 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value。
  3. 不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容。
  4. 最后会解除在 1 中所获取当前 Segment 的锁

get 方法

public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

get 逻辑比较简单:

只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。

由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。

ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁。

JDK1.8 中的结构 🔥

1.7 已经解决了并发问题,并且能支持 N 个 Segment 这么多次数的并发,但依然存在 HashMap 在 1.7 版本中的问题。

那就是查询遍历链表效率太低。

因此 1.8 做了一些数据结构上的调整。

首先来看下底层的组成结构:

看起来是不是和 1.8 HashMap 结构类似?

JDK 1.8 版本 摒弃了之前版本中较为臃肿的 Segment 分段锁设计,取而代之的是 Node 数组 + CAS + synchronized + volatile 的新设计。这样一来,ConcurrentHashMap 不仅数据结构变得更简单了(与JDK 1.8 的 HashMap 类似),锁的粒度也更小了,锁的单位从 Segment 变成了 Node 数组中的桶(科普:桶就是指数组中某个下标位置上的数据集合,这里可能是链表,也可能是红黑树)。 说到红黑树,必须提一下,在JDK 1.8 的 HashMap 和 ConcurrentHashMap 中,如果某个数组位置上的链表长度过长(大于等于8),就会转化为红黑树以提高查询效率

说白了就是 synchronized 只锁定当前链表或红黑二叉树的 首节点,这样只要 hash 不冲突,就不会产生并发,效率又提升 N 倍。

数据结构跟 HashMap1.8 的结构类似,数组 + 链表/红黑二叉树。Java 8 在链表长度超过一定阈值(8)时将链表(寻址时间复杂度为 O(N)O(N) )转换为红黑树(寻址时间复杂度为 O(log(N))O(log(N))

static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
// ...

也将 1.7 中存放数据的 HashEntry 改为 Node,但作用都是相同的。

其中的 val next 都用了 volatile 修饰,保证了可见性。

put 方法

重点来看看 put 函数:

  1. 第一次 put 元素会初始化 Node 数组 (initTable)
  2. put 操作又分为 key (hash 碰撞) 存在时的插入和 key 不存在时的插入
  3. put 操作可能会引发数组扩容 (tryPresize) 和链表转红黑树 (treeifyBin)
  4. 扩容会使用到数据迁移方法 (transfer)
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 得到 hash 值
int hash = spread(key.hashCode());
// 用于记录相应链表的长度
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组"空",进行数组初始化
if (tab == null || (n = tab.length) == 0)
// 表的初始化,这里不展开了,核心思想是使用sizeCtl的变量和CAS操作进行控制,保证数组在扩容时
// 不会创建出多余的表
tab = initTable();
// 找该 hash 值对应的数组下标,得到第一个节点 f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果数组该位置为空,用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了
// 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了(循环的意思是 CAS 在执行失败后会进行重试)
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 帮助数据迁移,
tab = helpTransfer(tab, f);
else { // 到这里就是说,f 是该位置的头结点,而且不为空
V oldVal = null;
// 获取数组该位置的头结点锁对象
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
// 用于累加,记录链表的长度
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了链表的最末端,将这个新值放到链表的最后面
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 红黑树
Node<K,V> p;
binCount = 2;
// 调用红黑树的插值方法插入新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
if (binCount >= TREEIFY_THRESHOLD)
// 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换,
// 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

补充:CAS 操作简要介绍

CAS 操作是新版本 ConcurrentHashMap 线程安全实现原理的精华所在,如果说其共享变量的读取全靠 volatile 实现线程安全的话,那么存储和修改主要是靠 CAS 操作实现线程安全的。(扩容这种费时的操作使用 synchronized 关键字修饰)

// CAS操作的提供者
private static final sun.misc.Unsafe U;

// 以下是 put 方法里用到 CAS 操作的代码片段
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}

// tabAt 方法通过 Unsafe.getObjectVolatile() 的方式获取数组对应 index 上的元素,
// getObjectVolatile 作用于对应的内存偏移量上,是具备 volatile 内存语义的。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// 如果获取的是空,尝试用 CAS 的方式在数组的指定 index 上创建一个新的 Node。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

在 ConcurrentHashMap 中,数组初始化、插入删除元素、扩容、数据迁移以及链表和红黑树的转换等过程都会涉及到线程安全问题,而相关的方法中实现线程安全的思想是一致的:对桶中的数据进行添加或修改操作时会用到 synchronized 关键字,也就是获得该位置上头节点对象的锁,保证线程安全,另外就是用到了大量的 CAS 操作。

以上就是对这三种 Map 的线程安全原理的简要介绍。

get 方法

public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); //计算hash
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//读取头节点的Node元素
if ((eh = e.hash) == h) { //如果该节点就是首节点就返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//hash 值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来
//eh 代表头节点的hash值
// eh=-1,说明该节点是一个ForwardingNode,正在迁移,此时调用ForwardingNode的find方法去nextTable里找。
// eh=-2,说明该节点是一个TreeBin,此时调用TreeBin的find方法遍历红黑树,由于红黑树有可能正在旋转变色,所以find里会有读写锁。
// eh>=0,说明该节点下挂的是一个链表,直接遍历该链表即可。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {//既不是首节点也不是ForwardingNode,那就往下遍历
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
  1. 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
  2. 如果是红黑树那就按照树的方式获取值。
  3. 就不满足那就按照链表的方式遍历获取值。

为什么 get 操作不需要加锁呢?

因为上面提到了使用 volatile 关键字已经足以保证线程在读取数据时不会读取到脏数据,所以没有加锁的必要。

1.8 在 1.7 的数据结构上做了大的改动,采用红黑树之后可以保证查询效率 O(logn)O(logn) ,甚至取消了 ReentrantLock 改为了 synchronized,这样可以看出在新版的 JDK 中对 synchronized 优化是很到位的。

补充:如何使 HashMap 线程安全?

我们都知道。HashMap 是非线程安全的(非同步的)。那么怎么才能让 HashMap 变成线程安全的呢?

synchronizedMap 是一个方法,HashMap 本身非线程安全的,但是当使用 Collections.synchronizedMap(new HashMap()) 进行包装后就返回一个线程安全的 Map。

点进这个 synchronizedMap 方法内部,可以发现它创建了一个 SynchronizedMap 对象

public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
return new SynchronizedMap<>(m);
}

来看 SynchronizedMap 的主要方法

public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}

从源码可以看出,SynchronizedMap 实现线程安全的方法也是比较简单的,所有方法都是先对锁对象 mutex 上锁,然后再直接调用 Map 类型成员变量 m 的相关方法。

这样一来,线程在执行方法时,只有先获得了 mutex 的锁才能对 m 进行操作。因此,跟 Hashtable 一样,在同一个时间点,只能有一个线程对 SynchronizedMap 对象进行操作,虽然保证了线程安全,却导致了性能低下。

它最大的好处就是传入的是一个 Map 接口,而不是具体的实现类,所以像 TreeMap 这种 Map 实现类就可以通过这个方法来生成一个线程安全的 Map

synchronizedMap(Map) 和 ConcurrentHashMap 区别 ?

ConcurrentHashMap 是 Java 1.5 中 Hashtable 的替代品,是并发包的一部分。使用 ConcurrentHashMap,不仅可以在并发多线程环境中安全地使用它,而且还提供比 Hashtable 和 synchronizedMap 更好的性能。

但是对于没有给出线程安全却需要线程安全的 Map 实现类就可以使用 synchronizedMap(Map) 来创建了

除此之外还有其它的并发工具,但是它们工作原理都是大同小异的

synchronizedCollection(Collection<T>  c) //返回指定 collection 支持的同步(线程安全的)collection。
synchronizedList(List<T> list)//返回指定列表支持的同步(线程安全的)List。
synchronizedMap(Map<K,V> m) //返回由指定映射支持的同步(线程安全的)Map。
synchronizedSet(Set<T> s) //返回指定 set 支持的同步(线程安全的)set。

Reference

Hashtable,Collections.SynchronizedMap和ConcurrentHashMap线程安全实现原理的区别以及性能测试 HashMap? ConcurrentHashMap? 相信看完这篇没人能难住你!